package com.wuwangfu.source;

import com.wuwangfu.entity.EventBean;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

/**
 * @Description：
 * @Author：jcshen
 * @Date：2023-07-06
 */
public class CustomEventSource implements SourceFunction<EventBean> {

    private Boolean running = true;
    private int number = 0;

    /**
     *
     * @param ctx run一旦结束，数据源就停止发送
     * @throws Exception
     */
    @Override
    public void run(SourceContext<EventBean> ctx) throws Exception {
        Random random = new Random();
        String[] users = {"Mary","Alice","Bob","Cary"};
        String[] urls = {"./home","./cart","./fav","./prod?id=1","./prod?id=2"};
        while(running){
            ctx.collect(new EventBean(users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()));
        }
        //1秒钟模拟1个请求
        Thread.sleep(1000);
    }

    /**
     * 中止数据
     */
    @Override
    public void cancel() {
        running = false;
    }
}
